A sales forecast is a prediction of future sales revenue based on historical data, industry trends, and the status of the current sales pipeline. Businesses use the sales forecast to estimate weekly, monthly, quarterly, and annual sales totals. A company needs to make an accurate sales forecast as it adds value across an organization and helps the different verticals to chalk out their future course of action.
Forecasting helps an organization plan its sales operations by region and provides valuable insights to the supply chain team regarding the procurement of goods and materials. An accurate sales forecast process has many benefits which include improved decision-making about the future and reduction of sales pipeline and forecast risks. Moreover, it helps to reduce the time spent in planning territory coverage and establish benchmarks that can be used to assess trends in the future.
SuperKart is a retail chain operating supermarkets and food marts across various tier cities, offering a wide range of products. To optimize its inventory management and make informed decisions around regional sales strategies, SuperKart wants to accurately forecast the sales revenue of its outlets for the upcoming quarter.
To operationalize these insights at scale, the company has partnered with a data science firm—not just to build a predictive model based on historical sales data, but to develop and deploy a robust forecasting solution that can be integrated into SuperKart’s decision-making systems and used across its network of stores.
The data contains the different attributes of the various products and stores.The detailed data dictionary is given below.
#Installing the libraries with the specified versions
!pip install numpy==2.0.2 pandas==2.2.2 scikit-learn==1.6.1 matplotlib==3.10.0 seaborn==0.13.2 joblib==1.4.2 xgboost==2.1.4 requests==2.32.3 huggingface_hub==0.30.1 streamlit==1.43.2 shap -q
Note:
After running the above cell, kindly restart the notebook kernel (for Jupyter Notebook) or runtime (for Google Colab) and run all cells sequentially from the next cell.
On executing the above line of code, you might see a warning regarding package dependencies. This error message can be ignored as the above code ensures that all necessary libraries and their dependencies are maintained to successfully execute the code in this notebook.
import warnings
warnings.filterwarnings("ignore")
import streamlit as st
# Libraries to help with reading and manipulating data
import numpy as np
import pandas as pd
# For splitting the dataset
from sklearn.model_selection import train_test_split
# Libaries to help with data visualization
import matplotlib.pyplot as plt
import seaborn as sns
# Lib for displaying the updated dataset
from IPython.display import display
import torch
print("GPU Available:", torch.cuda.is_available())
# Removes the limit for the number of displayed columns
pd.set_option("display.max_columns", None)
# Sets the limit for the number of displayed rows
pd.set_option("display.max_rows", 100)
# Libraries different ensemble classifiers
from sklearn.ensemble import (
BaggingRegressor,
RandomForestRegressor,
AdaBoostRegressor,
GradientBoostingRegressor,
)
from xgboost import XGBRegressor
from sklearn.tree import DecisionTreeRegressor
from sklearn import metrics
# Libraries to get different metric scores
from sklearn.metrics import (
confusion_matrix,
accuracy_score,
precision_score,
recall_score,
f1_score,
mean_squared_error,
mean_absolute_error,
r2_score,
mean_absolute_percentage_error
)
# To create the pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OrdinalEncoder, StandardScaler,OneHotEncoder
from sklearn.compose import make_column_transformer
from sklearn.pipeline import make_pipeline,Pipeline
# To tune different models and standardize
from sklearn.model_selection import GridSearchCV
# To serialize the model
import joblib
# os related functionalities
import os
# API request
import requests
# for hugging face space authentication to upload files
from huggingface_hub import login, HfApi, create_repo
import shap
# import from google drive
from google.colab import drive
# import secrets from google drive
from google.colab import userdata
drive.mount('/content/drive')
data_ = pd.read_csv('/content/drive/MyDrive/SuperKart.csv')
batch_test = pd.read_csv('/content/drive/MyDrive/SuperKart_Batch_.csv')
# Elementary-level tuning
data = data_.copy()
# Adavanced tuning (binning + dummies)
xgb_data = data.copy()
display(data.nunique())
Product_Id - Insignificant as this is exacly unique per entry. No value in keeping it.¶data.drop('Product_Id', axis=1, inplace=True)
xgb_data.drop('Product_Id', axis=1, inplace=True)
# select numerical columns from dataset
numerical_cols = [feature for feature in data.columns if data[feature].dtypes != 'O']
# select categorical columns from dataset
categorical_cols = [feature for feature in data.columns if data[feature].dtypes == 'O']
# lets also demarcate XGB copy DF
xgb_numerical_cols = [feature for feature in xgb_data.columns if xgb_data[feature].dtypes != 'O']
xgb_categorical_cols = [feature for feature in xgb_data.columns if xgb_data[feature].dtypes == 'O']
data.shape
display(data.info())
duplicate_percentage = data.duplicated().mean() * 100
print(f"Percentage of duplicate rows: {duplicate_percentage:.2f}%")
display(round(data.isnull().sum() / data.isnull().count() * 100, 2))
display(data.describe(include='number').T)
# lets check the proportion of the mode against total value counts
df = data.describe(include='object').T
df['mode_proportion'] = df['freq'] / df['count']
display(df)
| Column | Mode | Proportion |
|---|---|---|
Product_Sugar_Content |
Low Sugar | 55.7% |
Product_Type |
Fruits and Vegetables | 14.3% |
Store_Id |
OUT004 | 53.3% |
Store_Size |
Medium | 68.8% |
Store_Location_City_Type |
Tier 2 | 71.5% |
Store_Type |
Supermarket Type2 | 53.4% |
display(data.head(25))
display(data.tail(25))
# function to review a combination of boxplots and histograms
def histogram_boxplot(data, feature, figsize=(15, 10), kde=False, bins=None):
"""
Boxplot and histogram combined
data: dataframe
feature: dataframe column
figsize: size of figure (default (15,10))
kde: whether to show the density curve (default False)
bins: number of bins for histogram (default None)
"""
f2, (ax_box2, ax_hist2) = plt.subplots(
nrows=2, # Number of rows of the subplot grid= 2
sharex=True, # x-axis will be shared among all subplots
gridspec_kw={"height_ratios": (0.25, 0.75)},
figsize=figsize,
) # creating the 2 subplots
sns.boxplot(
data=data, x=feature, ax=ax_box2, showmeans=True, color="violet"
) # boxplot will be created and a triangle will indicate the mean value of the column
sns.histplot(
data=data, x=feature, kde=kde, ax=ax_hist2, bins=bins
) if bins else sns.histplot(
data=data, x=feature, kde=kde, ax=ax_hist2
) # For histogram
ax_hist2.axvline(
data[feature].mean(), color="green", linestyle="--"
) # Add mean to the histogram
ax_hist2.axvline(
data[feature].median(), color="black", linestyle="-"
) # Add median to the histogram
# function to create labeled barplots
def labeled_barplot(data, feature, perc=False, n=None):
"""
Barplot with percentage at the top
data: dataframe
feature: dataframe column
perc: whether to display percentages instead of count (default is False)
n: displays the top n category levels (default is None, i.e., display all levels)
"""
total = len(data[feature]) # length of the column
count = data[feature].nunique()
if n is None:
plt.figure(figsize=(count + 2, 6))
else:
plt.figure(figsize=(n + 2, 6))
plt.xticks(rotation=90, fontsize=15)
ax = sns.countplot(
data=data,
x=feature,
palette="Paired",
order=data[feature].value_counts().index[:n],
)
for p in ax.patches:
if perc == True:
label = "{:.1f}%".format(
100 * p.get_height() / total
) # percentage of each class of the category
else:
label = p.get_height() # count of each level of the category
x = p.get_x() + p.get_width() / 2 # width of the plot
y = p.get_height() # height of the plot
ax.annotate(
label,
(x, y),
ha="center",
va="center",
size=12,
xytext=(0, 5),
textcoords="offset points",
) # annotate the percentage
plt.show() # show the plot
def bin_categorical(series, threshold=0.10):
value_counts = series.value_counts(normalize=True)
major = value_counts[value_counts >= threshold].index
return series.apply(lambda x: x if x in major else 'Other')
for col in numerical_cols:
display(data[col].value_counts(1))
for col in categorical_cols:
display(data[col].value_counts(1))
data['Product_Sugar_Content'] = data['Product_Sugar_Content'].replace({
'reg': 'Regular',
'REG': 'Regular',
'regular': 'Regular'
})
xgb_data['Product_Sugar_Content'] = data['Product_Sugar_Content'].replace({
'reg': 'Regular',
'REG': 'Regular',
'regular': 'Regular'
})
for col in numerical_cols:
histogram_boxplot(data, col)
for col in categorical_cols:
print("\n")
labeled_barplot(data, col)
print("\n")
target = 'Product_Store_Sales_Total'
def binning(series, bins=15):
return pd.cut(series, bins=bins)
for col in categorical_cols:
plt.figure(figsize=(14, 6))
sns.countplot(x=col, data=data, order=data[col].value_counts().index)
plt.title(f'Count of each category in {col}')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
for col in numerical_cols:
if col != target: # Avoid redundant plot if already visualized elsewhere
plt.figure(figsize=(14, 6))
binned_col = binning(data[col], bins=20)
sns.countplot(x=binned_col, data=data)
plt.title(f'Binned Count Distribution of {col}')
plt.xticks(rotation=90)
plt.tight_layout()
plt.show()
Product_MRP and Product_Weight distributions are right-skewed, indicating the presence of premium or high-weight outliers.Store_Establishment_Year has a discrete spread, suggesting that store maturity might influence regional performance or inventory capacity.Product_Sugar_Content required label normalization (e.g., mapping 'reg' to 'Regular'), highlighting the importance of data consistency.for col in numerical_cols:
if col != target:
plt.figure(figsize=(15, 6))
binned_col = binning(data[col])
sns.boxplot(x=binned_col, y=target, data=data)
plt.title(f'{col} (binned) vs {target}')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
# Scatterplot
for col in numerical_cols:
if col != target:
plt.figure(figsize=(15, 6))
sns.scatterplot(x=col, y=target, data=data)
plt.title(f'{col} vs {target}')
plt.tight_layout()
plt.show()
Product_MRP and Product_Store_Sales_Total show a strong positive relationship, reinforcing the pricing impact on revenue.Product_Weight also correlates positively with revenue, implying that bulk or high-mass products contribute more to sales totals.Snack Foods, Household) vary widely in revenue impact, showing potential for strategic SKU prioritization.Store_Size and Store_Type appear to have stratified impacts on sales, though variance remains within category levels.sns.set(rc={'figure.figsize':(16,10)})
sns.heatmap(data.corr(numeric_only = True),
annot=True,
linewidths=.5,
center=0,
cbar=False,
cmap="Spectral")
plt.show()
| Feature Pair | Correlation | Interpretation |
|---|---|---|
Product_MRP ↔ Product_Store_Sales_Total |
0.79 | Strong positive linear correlation. Higher MRP is associated with higher total sales. Useful for prediction. |
Product_Weight ↔ Product_Store_Sales_Total |
0.74 | Heavier products are linked with higher sales—likely due to bulk or premium status. |
Product_Weight ↔ Product_MRP |
0.53 | Moderately correlated. Heavier items may be priced higher. Possible multicollinearity warning. |
| Feature Pair | Correlation | Observation |
|---|---|---|
Product_Allocated_Area ↔ All Other Features |
~0.00 | No correlation. Likely independent. Low predictive value unless nonlinear. |
Store_Establishment_Year ↔ Product_Store_Sales_Total |
-0.19 | Weak negative correlation. Newer stores may show slightly lower sales. |
Product_MRP ↔ Store_Establishment_Year |
-0.19 | Weak negative relationship. Potentially negligible for modeling. |
Multicollinearity:
Product_Weight and Product_MRP both strongly correlate with Product_Store_Sales_Total.Feature Redundancy:
Model Implications:
Feature Prioritization:
Product_MRP, Product_WeightProduct_Allocated_Area, Store_Establishment_Year# Pairplot
# Get the name of the target Series
target_column = target.name if isinstance(target, pd.Series) else target
# Now form the correct DataFrame slice
pairplot_data = data[numerical_cols ]
for col in pairplot_data.columns:
print(f"{col}: {pairplot_data[col].shape} | type: {type(pairplot_data[col].values)}")
# Boxplot
for col in categorical_cols:
plt.figure(figsize=(14, 6))
sns.boxplot(x=col, y=target, data=data)
plt.title(f'{col} vs {target}')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
plt.figure(figsize=(8,6))
plt.scatter(
x=pairplot_data['Product_Weight'],
y=pairplot_data['Product_MRP'],
c=pairplot_data['Product_Store_Sales_Total'],
cmap='viridis',
alpha=0.5
)
plt.colorbar(label='Sales Revenue')
plt.xlabel('Product Weight')
plt.ylabel('Product MRP')
plt.title('MRP vs Weight colored by Revenue')
plt.tight_layout()
plt.show()
# Create a categorical bin of our continuous target
pairplot_data['Revenue_Bin'] = pd.qcut(pairplot_data['Product_Store_Sales_Total'], q=4, labels=["Low", "Med-Low", "Med-High", "High"])
# Plot with color by bin
sns.pairplot(pairplot_data, diag_kind='kde', hue='Revenue_Bin', plot_kws={'alpha': 0.6})
plt.suptitle("Pairplot Colored by Revenue Bin", y=1.02)
plt.show()
Product_Weight, Product_MRP, and Product_Store_Sales_Total.Store_Establishment_Year does not show a strong multivariate interaction effect, aligning with its lower feature importance score in model evaluation.# Define predictor matrix (X) using selected numeric and categorical features
X = data[numerical_cols + categorical_cols]
# Define target variable
y = data[target]
# Split the dataset into training and test sets
X_train, X_test, y_train, y_test = train_test_split(
X, y, # Predictors (X) and target variable (y)
test_size=0.2, # 20% of the data is reserved for testing
random_state=33 # Ensures reproducibility by setting a fixed random seed
)
# Create a preprocessing pipeline for numerical and categorical features
# Feature groups
numerical_columns = ['Product_Weight', 'Product_MRP']
onehot_cols = ['Product_Type', 'Store_Type', 'Store_Location_City_Type']
ordinal_cols = ['Store_Size', 'Product_Sugar_Content']
# Define custom orderings for ordinal encoding
store_size_order = ['Small', 'Medium', 'High']
sugar_content_order = ['No Sugar', 'Low Sugar', 'Regular']
ordinal_categories = [store_size_order, sugar_content_order]
# Define pipeline for numerical columns
numerical_pipeline = Pipeline([
('num_imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
# Define pipeline for ordinal columns
ordinal_pipeline = Pipeline([
('ord_imputer', SimpleImputer(strategy='most_frequent')),
('ordinal', OrdinalEncoder(categories=ordinal_categories))
])
# Define pipeline for one-hot columns
onehot_pipeline = Pipeline([
('cat_imputer', SimpleImputer(strategy='most_frequent')),
('onehot', OneHotEncoder(handle_unknown='ignore'))
])
# Combine into a single ColumnTransformer
preprocessor = make_column_transformer(
(numerical_pipeline, numerical_columns),
(ordinal_pipeline, ordinal_cols),
(onehot_pipeline, onehot_cols)
)
# function to compute adjusted R-squared
def adj_r2_score(predictors, targets, predictions):
r2 = r2_score(targets, predictions)
n = predictors.shape[0]
k = predictors.shape[1]
return 1 - ((1 - r2) * (n - 1) / (n - k - 1))
# function to compute different metrics to check performance of a regression model
def model_performance_regression(model, predictors, target):
"""
Function to compute different metrics to check regression model performance
model: regressor
predictors: independent variables
target: dependent variable
"""
# predicting using the independent variables
pred = model.predict(predictors)
r2 = r2_score(target, pred) # to compute R-squared
adjr2 = adj_r2_score(predictors, target, pred) # to compute adjusted R-squared
rmse = np.sqrt(mean_squared_error(target, pred)) # to compute RMSE
mae = mean_absolute_error(target, pred) # to compute MAE
mape = mean_absolute_percentage_error(target, pred) # to compute MAPE
# creating a dataframe of metrics
df_perf = pd.DataFrame(
{
"RMSE": rmse,
"MAE": mae,
"R-squared": r2,
"Adj. R-squared": adjr2,
"MAPE": mape,
},
index=[0],
)
return df_perf
The ML models to be built can be any two out of the following:
# Define base Random Forest model
rf_model = RandomForestRegressor(random_state=33)
# Create pipeline with preprocessing and Random Forest model
rf_pipeline = make_pipeline(preprocessor, rf_model)
# Train the model pipeline on the training data
rf_pipeline.fit(X_train, y_train)
rf_estimator_model_train_perf = model_performance_regression(rf_pipeline, X_train,y_train)
print("Training performance \n")
rf_estimator_model_train_perf
rf_estimator_model_test_perf = model_performance_regression(rf_pipeline, X_test,y_test)
print("Testing performance \n")
rf_estimator_model_test_perf
# Define base XGBoost model
xgb_model = XGBRegressor(random_state=33, tree_method='gpu_hist', predictor='gpu_predictor')
# Create pipeline with preprocessing and XGBoost model
xgb_pipeline = make_pipeline(preprocessor, xgb_model)
# Train the model pipeline on the training data
xgb_pipeline.fit(X_train, y_train)
xgb_estimator_model_train_perf = model_performance_regression(xgb_pipeline, X_train, y_train)
print("Training performance \n")
xgb_estimator_model_train_perf
xgb_estimator_model_test_perf = model_performance_regression(xgb_pipeline, X_test,y_test)
print("Testing performance \n")
xgb_estimator_model_test_perf
# Choose the type of classifier.
rf_tuned = RandomForestRegressor(random_state=33)
# Create pipeline with preprocessing and XGBoost model
rf_pipeline = make_pipeline(preprocessor, rf_tuned)
# Grid of parameters to choose from
parameters = parameters = {
'randomforestregressor__max_depth':[3, 4, 5, 6],
'randomforestregressor__max_features': ['sqrt','log2',None],
'randomforestregressor__n_estimators': [50, 75, 100, 125, 150]
}
# Type of scoring used to compare parameter combinations
scorer = metrics.make_scorer(metrics.r2_score)
# Run the grid search
grid_obj = GridSearchCV(rf_pipeline, parameters, scoring=scorer,cv=5)
grid_obj = grid_obj.fit(X_train, y_train)
# Set the clf to the best combination of parameters
rf_tuned = grid_obj.best_estimator_
# Fit the best algorithm to the data.
rf_tuned.fit(X_train, y_train)
rf_tuned_model_train_perf = model_performance_regression(rf_tuned, X_train, y_train)
print("Training performance \n")
rf_tuned_model_train_perf
rf_tuned_model_test_perf = model_performance_regression(rf_tuned, X_test, y_test)
print("Testing performance \n")
rf_tuned_model_test_perf
# Get feature importances
rf_model_from_pipeline = rf_tuned.named_steps['randomforestregressor']
importances = rf_model_from_pipeline.feature_importances_
# Get the feature names from the preprocessor after transformation
# This is necessary because one-hot encoding creates new columns
preprocessor_step = rf_tuned.named_steps['columntransformer']
# Get the names of the transformed features
# The get_feature_names_out() method is available in newer versions of scikit-learn
try:
features_after_preprocessing = preprocessor_step.get_feature_names_out()
except AttributeError:
# Fallback for older scikit-learn versions if get_feature_names_out() is not available
# This requires inspecting the transformer steps and their output shapes
print("Warning: scikit-learn version might be old. Consider upgrading for get_feature_names_out().")
# Attempt to manually construct feature names (less robust)
numerical_features = preprocessor_step.transformers_[0][2]
ordinal_features = preprocessor_step.transformers_[1][2]
onehot_encoder = preprocessor_step.transformers_[2][1].named_steps['onehot']
onehot_feature_names = onehot_encoder.get_feature_names_out(preprocessor_step.transformers_[2][2])
features_after_preprocessing = np.concatenate([numerical_features, ordinal_features, onehot_feature_names])
# Ensure the number of feature names matches the number of importances
if len(features_after_preprocessing) != len(importances):
print(f"Mismatch: Found {len(features_after_preprocessing)} feature names but {len(importances)} importances.")
print("Check your preprocessing steps and scikit-learn version.")
# Handle the mismatch, maybe by raising an error or skipping plotting
# For now, we will proceed assuming the corrected feature names match importances
else:
features = features_after_preprocessing
# Create DataFrame for sorting
feat_imp_df = pd.DataFrame({
'Feature': features,
'Importance': importances
}).sort_values(by='Importance', ascending=False)
# Plot
plt.figure(figsize=(10, 6))
sns.barplot(x='Importance', y='Feature', data=feat_imp_df.head(15), palette='viridis')
plt.title("Top 15 Feature Importances (Random Forest)")
plt.tight_layout()
plt.show()
# Predictions and residuals
y_pred = rf_tuned.predict(X_test)
residuals = y_test - y_pred
# Residuals vs Predicted
plt.figure(figsize=(8, 6))
sns.scatterplot(x=y_pred, y=residuals, alpha=0.5)
plt.axhline(0, color='red', linestyle='--')
plt.xlabel("Predicted Values")
plt.ylabel("Residuals")
plt.title("Residuals vs. Predicted Values")
plt.tight_layout()
plt.show()
# Distribution of residuals
plt.figure(figsize=(8, 6))
sns.histplot(residuals, kde=True, bins=30)
plt.axvline(0, color='red', linestyle='--')
plt.title("Distribution of Residuals")
plt.xlabel("Residual")
plt.tight_layout()
plt.show()
# Choose the type of classifier.
xgb_tuned = XGBRegressor(
random_state=33, tree_method='gpu_hist',
predictor='gpu_predictor', n_estimators=100,
max_depth=6, learning_rate=0.1, objective='reg:squarederror')
# Create pipeline with preprocessing and XGBoost model
xgb_pipeline = make_pipeline(preprocessor, xgb_tuned)
#Grid of parameters to choose from
param_grid = {
'xgbregressor__n_estimators': [50, 100, 150, 200], # number of trees to build
'xgbregressor___max_depth': [2, 3, 4], # maximum depth of each tree
'xgbregressor___colsample_bytree': [0.4, 0.5, 0.6], # percentage of attributes to be considered (randomly) for each tree
'xgbregressor___colsample_bylevel': [0.4, 0.5, 0.6], # percentage of attributes to be considered (randomly) for each level of a tree
'xgbregressor___learning_rate': [0.01, 0.05, 0.1], # learning rate
'xgbregressor___reg_lambda': [0.4, 0.5, 0.6], # L2 regularization factor
}
# Type of scoring used to compare parameter combinations
scorer = metrics.make_scorer(metrics.r2_score)
# Run the grid search
grid_obj = GridSearchCV(xgb_pipeline, param_grid, scoring=scorer,cv=5,n_jobs=-1)
grid_obj = grid_obj.fit(X_train, y_train)
# Set the clf to the best combination of parameters
xgb_tuned = grid_obj.best_estimator_
# Fit the best algorithm to the data.
xgb_tuned.fit(X_train, y_train)
xgb_tuned_model_train_perf = model_performance_regression(xgb_tuned, X_train, y_train)
print("Training performance \n")
xgb_tuned_model_train_perf
xgb_tuned_model_test_perf = model_performance_regression(xgb_tuned, X_test, y_test)
print("Testing performance \n")
xgb_tuned_model_test_perf
xgb_final data copy¶xgb_data['Product_MRP_bin'] = pd.qcut(xgb_data['Product_MRP'], q=5, duplicates='drop')
xgb_data['Product_Weight_bin'] = pd.qcut(xgb_data['Product_Weight'], q=5, duplicates='drop')
xgb_data['Product_Type_Binned'] = bin_categorical(xgb_data['Product_Type'])
xgb_data['Store_Type_Binned'] = bin_categorical(xgb_data['Store_Type'])
xgb_data['Store_Location_City_Type_Binned'] = bin_categorical(xgb_data['Store_Location_City_Type'])
# Now apply one-hot encoding to the original and newly binned categorical columns
xgb_data = pd.get_dummies(xgb_data, columns=[
'Product_Type_Binned', 'Store_Type_Binned', 'Store_Location_City_Type_Binned',
'Product_MRP_bin', 'Product_Weight_bin'
], drop_first=False)
print("Columns in xgb_data:")
print(xgb_data.columns.tolist())
print("Number of features after encoding:", xgb_data.shape[1])
# Identify new encoded columns
encoded_cols = [col for col in xgb_data.columns if '_Binned' in col or '_bin_' in col]
# Filter only columns with more than one unique value
plottable_cols = [col for col in encoded_cols if xgb_data[col].nunique() > 1]
print(f"{len(plottable_cols)} of {len(encoded_cols)} dummy features have more than one unique value.")
# Skip if nothing to plot
if not plottable_cols:
print("No dummy columns with variance to plot.")
else:
# Prepare dynamic layout
ncols = 3
nrows = (len(plottable_cols) + ncols - 1) // ncols
fig, axes = plt.subplots(nrows=nrows, ncols=ncols, figsize=(5*ncols, 4*nrows))
axes = axes.flatten()
for idx, col in enumerate(plottable_cols):
sns.histplot(data=xgb_data, x=col, bins=2, ax=axes[idx])
axes[idx].set_title(f"{col} Distribution")
# Remove unused axes
for j in range(idx+1, len(axes)):
fig.delaxes(axes[j])
plt.tight_layout()
plt.show()
# View dummy-encoded columns
encoded_cols = [col for col in xgb_data.columns if '_Binned_' in col or '_bin_' in col]
print("New dummy-encoded features:")
print(encoded_cols)
fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(14, 5))
# Histogram + KDE
sns.histplot(data=xgb_data, x='Product_Store_Sales_Total', kde=True, bins=30, ax=axes[0])
axes[0].set_title("Distribution of Product_Store_Sales_Total")
axes[0].set_xlabel("Sales Total")
axes[0].set_ylabel("Frequency")
# Boxplot
sns.boxplot(data=xgb_data, x='Product_Store_Sales_Total', ax=axes[1])
axes[1].set_title("Boxplot of Product_Store_Sales_Total")
axes[1].set_xlabel("Sales Total")
plt.tight_layout()
plt.show()
# Confirm this is coming from the preprocessed dummy-encoded DataFrame
# When preparing data for training:
X = xgb_data.drop(columns=[
'Product_Store_Sales_Total',
'Product_Type', 'Store_Type', 'Store_Location_City_Type', # raw categoricals
'Product_MRP', 'Product_Weight' # raw numerics if binned
])
y = xgb_data['Product_Store_Sales_Total']
# Drop any residual object-type columns if they slipped in
X = X.select_dtypes(include=['number']).copy()
# Create binned version of target for stratification
y_binned = pd.qcut(y, q=10, labels=False, duplicates='drop') # Adjust `q` if needed
# Stratified train-test split (using binned target)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=33, stratify=y_binned
)
xgb_plus = XGBRegressor(
random_state=33, tree_method='gpu_hist',
predictor='gpu_predictor', n_estimators=100,
max_depth=6, learning_rate=0.1, objective='reg:squarederror')
# Create pipeline (no preprocessor since data is already processed)
xgb_pipeline = Pipeline([
('xgbregressor', xgb_plus)
])
# Run the grid search
grid_search = GridSearchCV(xgb_pipeline, param_grid, scoring=scorer,cv=5,n_jobs=-1)
grid_search = grid_search.fit(X_train, y_train)
# Best model
xgb_final = grid_search.best_estimator_
# Final training for safety
xgb_final.fit(X_train, y_train)
xgb_final_train_perf = model_performance_regression(xgb_final, X_train, y_train)
xgb_final_test_perf = model_performance_regression(xgb_final, X_test, y_test)
print("Train:")
display(xgb_final_train_perf)
print("Test:")
display(xgb_final_test_perf)
# Training performance comparison
models_train_comp_df = pd.concat(
[rf_estimator_model_train_perf.T,rf_tuned_model_train_perf.T,
xgb_estimator_model_train_perf.T,xgb_tuned_model_train_perf.T, xgb_final_train_perf.T],
axis=1,
)
models_train_comp_df.columns = [
"Random Forest Estimator",
"Random Forest Tuned",
"XGBoost",
"XGBoost Tuned",
"XGBoost Final",
]
print("Training performance comparison:")
models_train_comp_df
# Testing performance comparison
models_test_comp_df = pd.concat(
[rf_estimator_model_test_perf.T,rf_tuned_model_test_perf.T,
xgb_estimator_model_test_perf.T,xgb_tuned_model_test_perf.T, xgb_final_test_perf.T],
axis=1,
)
models_test_comp_df.columns = [
"Random Forest Estimator",
"Random Forest Tuned",
"XGBoost",
"XGBoost Tuned",
"XGBoost_Final",
]
print("Testing performance comparison:")
models_test_comp_df
(models_train_comp_df - models_test_comp_df).iloc[2]
# Create a folder for storing the files needed for web app deployment
os.makedirs("/content/deployment_files/Model", exist_ok=True)
# Define the file path to save (serialize) the trained model along with the data preprocessing steps
saved_model_path = "/content/deployment_files/Model/store-sales-prediction-model-v1-0.joblib"
# Save the best trained model pipeline using joblib
joblib.dump(xgb_tuned, saved_model_path)
print(f"Model saved successfully at {saved_model_path}")
# Load the saved model pipeline from the file
saved_model = joblib.load("/content/deployment_files/Model/store-sales-prediction-model-v1-0.joblib")
# Confirm the model is loaded
print("Model loaded successfully.")
saved_model
# Google Colab secrets management
access_key = userdata.get('HF_TOKEN')
api = HfApi(token=os.getenv(access_key))
api.upload_folder(
folder_path="/content/deployment_files/Model",
repo_id="omoral02/RevenuePrediction",
repo_type="model",
)
os.makedirs("/content/deployment_files/Backend", exist_ok=True)
%%writefile /content/deployment_files/Backend/app.py
# Import necessary libraries
import numpy as np
import joblib
import pandas as pd
from flask import Flask, request, jsonify
from huggingface_hub import hf_hub_download
import joblib
import tempfile
import streamlit as st
import io # Import io module
REPO_ID = "omoral02/RevenuePrediction"
FILENAME = "store-sales-prediction-model-v1-0.joblib"
# Write model to temp directory (writable in Hugging Face Spaces)
temp_dir = tempfile.gettempdir()
model_path = hf_hub_download(repo_id=REPO_ID, filename=FILENAME, cache_dir=temp_dir)
model = joblib.load(model_path)
# Initialize the Flask app
superkart_api = Flask("SuperKart Sales Predictor")
# def transform_input_for_model(df_raw):
# # Binning
# df_raw['Product_MRP_bin'] = pd.qcut(df_raw['Product_MRP'], q=5, duplicates='drop')
# df_raw['Product_Weight_bin'] = pd.qcut(df_raw['Product_Weight'], q=5, duplicates='drop')
# df_raw['Product_Type_Binned'] = bin_categorical(df_raw['Product_Type'])
# df_raw['Store_Type_Binned'] = bin_categorical(df_raw['Store_Type'])
# df_raw['Store_Location_City_Type_Binned'] = bin_categorical(df_raw['Store_Location_City_Type'])
# # Dummy encoding
# df_encoded = pd.get_dummies(df_raw, columns=[
# 'Product_Type_Binned', 'Store_Type_Binned',
# 'Store_Location_City_Type_Binned',
# 'Product_MRP_bin', 'Product_Weight_bin'
# ], drop_first=False)
# # Drop original fields
# df_encoded = df_encoded.select_dtypes(include=['number']).copy()
# return df_encoded
# Load the trained model
@st.cache_resource
def load_model():
return model
model = load_model()
# Define root endpoint
@superkart_api.get('/')
def home():
return jsonify({"okay": "Welcome to the SuperKart Sales Prediction API!"})
# Endpoint for single record prediction
@superkart_api.post('/v1/predict')
def predict_sales():
if model is None:
return jsonify({"error": "Model not loaded"}), 500
try:
input_json = request.get_json()
expected_fields = [
'Product_Type', 'Store_Type', 'Store_Location_City_Type',
'Store_Size', 'Product_Sugar_Content', 'Product_Weight',
'Product_MRP', 'Product_Allocated_Area', 'Store_Establishment_Year'
]
missing = [f for f in expected_fields if f not in input_json]
if missing:
return jsonify({
'error': 'Missing required input fields.',
'missing_fields': missing,
'received_fields': list(input_json.keys())
}), 400
# Extract relevant inputs (must match training columns)
features = {
'Product_Type': input_json['Product_Type'],
'Store_Type': input_json['Store_Type'],
'Store_Location_City_Type': input_json['Store_Location_City_Type'],
'Store_Size': input_json['Store_Size'],
'Product_Sugar_Content': input_json['Product_Sugar_Content'],
'Product_Weight': input_json['Product_Weight'],
'Product_MRP': input_json['Product_MRP'],
'Product_Allocated_Area': input_json['Product_Allocated_Area'],
'Store_Establishment_Year': input_json['Store_Establishment_Year'],
}
input_df = pd.DataFrame([features])
# df_transformed = transform_input_for_model(input_df)
prediction = model.predict(input_df)[0]
return jsonify({'Predicted_Store_Sales_Total': round(float(prediction), 2)})
except Exception as e:
print(f"Error during single prediction: {e}") # Added print for debugging
return jsonify({"error": str(e), "message": "Prediction failed"}), 500 # Return error message and status code
# Endpoint for batch prediction using CSV
@superkart_api.post('/v1/batch')
def predict_sales_batch():
try:
uploaded_file = request.files['file']
input_df = pd.read_csv(uploaded_file)
expected_fields = [
'Product_Type', 'Store_Type', 'Store_Location_City_Type',
'Store_Size', 'Product_Sugar_Content', 'Product_Weight',
'Product_MRP', 'Product_Allocated_Area', 'Store_Establishment_Year'
]
missing = [f for f in expected_fields if f not in input_df.columns]
if missing:
return jsonify({
'error': 'Missing required columns in uploaded CSV.',
'missing_columns': missing,
'received_columns': list(input_df.columns)
}), 400
# df_transformed = transform_input_for_model(input_df)
predictions = model.predict(input_df).tolist()
rounded_preds = [round(float(p), 2) for p in predictions]
return jsonify({'Predicted_Store_Sales_Total': rounded_preds})
# Optional: use product-store pair if available
# if 'Product_Id' in df_transformed.columns and 'Store_Id' in df_transformed.columns:
# keys = df_transformed['Product_Id'].astype(str) + "_" + df_transformed['Store_Id'].astype(str)
# else:
# keys = [f"row_{i}" for i in range(len(df_transformed))]
# return jsonify(dict(zip(keys, rounded_preds)))
except Exception as e:
print(f"Error during batch prediction: {e}") # Added print for debugging
return jsonify({"error": str(e), "message": "Prediction failed"}), 500
# Run the Flask app
if __name__ == '__main__':
superkart_api.run(host="0.0.0.0", port=7860)
%%writefile /content/deployment_files/Backend/requirements.txt
pandas==2.2.2
numpy==2.0.2
scikit-learn==1.6.1
xgboost==2.1.4
joblib==1.4.2
Werkzeug==2.2.2
flask==2.2.2
gunicorn==20.1.0
requests==2.28.1
uvicorn[standard]
huggingface_hub==0.20.3
streamlit==1.43.2
%%writefile /content/deployment_files/Backend/Dockerfile
# Base image
FROM python:3.9-slim
# Set working directory
WORKDIR /app
# Copy files
COPY . .
# Install dependencies
RUN pip install -r requirements.txt
# Expose port used by Flask
EXPOSE 7860
# Run the app
CMD ["python", "app.py"]
# Login to Hugging Face account using access token
login(token=access_key)
# Try to create the repository for the Hugging Face Space
try:
create_repo("RevenuePredictionBackend",
repo_type="space", # Specify the repository type as "space"
space_sdk="docker", # Specify the space SDK as "docker" to create a Docker space
private=False # Set to True if you want the space to be private
)
except Exception as e:
# Handle potential errors during repository creation
if "RepositoryAlreadyExistsError" in str(e):
print("Repository already exists. Skipping creation.")
else:
print(f"Error creating repository: {e}")
repo_id = "omoral02/RevenuePredictionBackend" # Hugging Face space id
# Login to Hugging Face platform with the access token
login(token=access_key)
# Initialize the API
api = HfApi()
# Upload Streamlit app files stored in the folder called deployment_files
api.upload_folder(
folder_path="/content/deployment_files/Backend", # Local folder path
repo_id=repo_id, # Hugging face space id
repo_type="space", # Hugging face repo type "space"
)
Creating Spaces and Adding Secrets in Hugging Face from Week 1os.makedirs("/content/deployment_files/Frontend", exist_ok=True)
%%writefile /content/deployment_files/Frontend/app.py
import streamlit as st
import pandas as pd
import joblib
import numpy as np
import requests
# UI Title and Subtitle
st.title("🛒 SuperKart Sales Forecasting App")
st.write("This tool predicts **product-level revenue** in a specific store using historical and categorical inputs.")
# UI for Input Features
st.subheader("Enter Product & Store Details:")
# Categorical Inputs
product_type = st.selectbox("Product Type", [
"Meat", "Snack Foods", "Soft Drinks", "Dairy", "Household", "Fruits and Vegetables",
"Frozen Foods", "Breakfast", "Baking Goods", "Health and Hygiene", "Starchy Foods"
])
store_type = st.selectbox("Store Type", [
"Supermarket Type1", "Supermarket Type2", "Supermarket Type3", "Grocery Store"
])
city_type = st.selectbox("City Type", ["Tier 1", "Tier 2", "Tier 3"])
store_size = st.selectbox("Store Size", ["Small", "Medium", "High"])
sugar_content = st.selectbox("Product Sugar Content", ["No Sugar", "Low Sugar", "Regular"])
# Numerical Inputs
product_weight = st.number_input("Product Weight (kg)", min_value=0.0, max_value=50.0, value=10.0, step=0.1)
product_mrp = st.number_input("Product MRP", min_value=0.0, max_value=1000.0, value=200.0, step=1.0)
allocated_area = st.number_input("Allocated Display Area (0-1)", min_value=0.0, max_value=1.0, value=0.2, step=0.01)
store_est_year = st.number_input("Store Establishment Year", min_value=1950, max_value=2025, value=2010)
# Convert to DataFrame
input_data = pd.DataFrame({
'Product_Type': [product_type],
'Store_Type': [store_type],
'Store_Location_City_Type': [city_type],
'Store_Size': [store_size],
'Product_Sugar_Content': [sugar_content],
'Product_Weight': [product_weight],
'Product_MRP': [product_mrp],
'Product_Allocated_Area': [allocated_area],
'Store_Establishment_Year': [store_est_year],
})
# Make prediction when the "Predict" button is clicked
if st.button("Predict"):
response = requests.post("https://omoral02-RevenuePredictionBackend.hf.space/v1/predict", json=input_data.to_dict(orient='records')[0]) # Send data to Flask API
if response.status_code == 200:
prediction = response.json()['Predicted_Store_Sales_Total']
st.success(f"Predicted Revenue (in dollars): {prediction}")
else:
st.error("Error making prediction.")
# Section for batch prediction
st.subheader("Batch Prediction")
# Allow users to upload a CSV file for batch prediction
uploaded_file = st.file_uploader("Upload CSV file for batch prediction", type=["csv"])
# Make batch prediction when the "Predict Batch" button is clicked
if uploaded_file is not None:
if st.button("Predict Batch"):
response = requests.post("https://omoral02-RevenuePredictionBackend.hf.space/v1/batch", files={"file": uploaded_file}) # Send file to Flask API
if response.status_code == 200:
predictions = response.json()
st.success("Batch predictions completed!")
st.write(predictions) # Display the predictions
else:
st.error("Error making batch prediction.")
%%writefile /content/deployment_files/Frontend/requirements.txt
pandas==2.2.2
numpy==2.0.2
scikit-learn==1.6.1
xgboost==2.1.4
joblib==1.4.2
streamlit==1.43.2
requests==2.28.1
%%writefile /content/deployment_files/Frontend/Dockerfile
# Use a minimal base image with Python 3.9 installed
FROM python:3.9-slim
# Set the working directory inside the container to /app
WORKDIR /app
# Copy all files from the current directory on the host to the container's /app directory
COPY . .
# Install Python dependencies listed in requirements.txt
RUN pip3 install -r requirements.txt
# Define the command to run the Streamlit app on port 7860 and make it accessible externally
CMD ["streamlit", "run", "app.py", "--server.port=7860", "--server.address=0.0.0.0", "--server.enableXsrfProtection=false"]
# NOTE: Disable XSRF protection for easier external access in order to make batch predictions
# Login to Hugging Face account using access token
login(token=access_key)
# Try to create the repository for the Hugging Face Space
try:
create_repo("RevenuePredictionFrontend", # One can replace "Backend_Docker_space" with the desired space name
repo_type="space", # Specify the repository type as "space"
space_sdk="docker", # Specify the space SDK as "docker" to create a Docker space
private=False # Set to True if you want the space to be private
)
except Exception as e:
# Handle potential errors during repository creation
if "RepositoryAlreadyExistsError" in str(e):
print("Repository already exists. Skipping creation.")
else:
print(f"Error creating repository: {e}")
repo_id = "omoral02/RevenuePredictionFrontend" # Your Hugging Face space id
# Login to Hugging Face platform with the access token
login(token=access_key)
# Initialize the API
api = HfApi()
# Upload Streamlit app files stored in the folder called deployment_files
api.upload_folder(
folder_path="/content/deployment_files/Frontend", # Local folder path
repo_id=repo_id, # Hugging face space id
repo_type="space", # Hugging face repo type "space"
)
# Lets write a GET try block
try:
get = requests.get("https://omoral02-RevenuePredictionBackend.hf.space")
print(get.status_code)
print(get.headers)
print(get.json())
except Exception as e:
print(e)
sample = {
'Product_Type': 'Snack Foods',
'Store_Type': 'Supermarket Type1',
'Store_Location_City_Type': 'Tier 1',
'Store_Size': 'Medium',
'Product_Sugar_Content': 'Low Sugar',
'Product_Weight': 9.5,
'Product_MRP': 150.0,
'Product_Allocated_Area': 0.25,
'Store_Establishment_Year': 2010
}
try:
post = requests.post("https://omoral02-RevenuePredictionBackend.hf.space/v1/predict", json=sample)
print(post.status_code)
print(post.json())
except Exception as e:
print(e)
A test CSV file SuperKart_batch.csv was generated with 10 realistic entries based on the existing data schema. Each row includes:
The batch file allows for evaluating model inference via a POST request to the backend Flask API.
| Product_Type | Store_Type | Store_Location_City_Type | Store_Size | Product_Sugar_Content | Product_Weight | Product_MRP | Product_Allocated_Area | Store_Establishment_Year |
|---|---|---|---|---|---|---|---|---|
| Snack Foods | Supermarket Type1 | Tier 2 | Medium | Low Sugar | 10.2 | 153 | 0.07 | 2009 |
| Health and Hygiene | Supermarket Type2 | Tier 2 | High | Regular | 13.8 | 182.6 | 0.12 | 2005 |
| Canned | Supermarket Type1 | Tier 2 | High | No Sugar | 11.4 | 143.8 | 0.10 | 1999 |
with open("/content/drive/MyDrive/SuperKart_Batch_.csv", "rb") as f:
response = requests.post(
"https://omoral02-RevenuePredictionBackend.hf.space/v1/batch",
files={"file": f}
)
print("Status Code:", response.status_code)
print("Response:", response.json())
Model Selection Justification: xgb_tuned
Performance Tradeoff:
While xgb_final (with binning + dummy variables) explored deeper feature engineering, it showed degraded generalization (overfitting and poorer R² on test set).
xgb_tuned delivers balanced performance on both train and test sets without overfitting, supported by robust regularization and tree depth constraints.
✅ R² on Test: ~0.93
✅ MAPE: ~4.3%
✅ MAE: ~109
➤ These metrics suggest reliable and stable generalization, suitable for production.
Dynamic Inventory Planning Use predicted revenue per product to prioritize stock allocation by product category and store location type.
High Product_MRP with specific Store_Types (e.g., Supermarket Type2 + Tier 2) yield maximum ROI.
Product Portfolio Optimization Categories like Dairy, Snack Foods, Meat, and Starchy Foods showed significant weight in predictions.
Consider deeper segmentation within these for micro-forecasting.
Store Expansion Planning Leverage insights from Store_Size, Store_Establishment_Year, and Store_Location_City_Type:
Older stores in Tier 2 regions consistently show higher normalized revenue per unit MRP.
Prioritize medium-sized stores in Tier 2 cities for expansion.
Products with higher Product_Allocated_Area and Product_MRP have nonlinear effects; useful for promotional bundling or pricing.
xgb_tuned was serialized and integrated into a Fast API + Streamlit + Hugging Face deployment flow.
Batch and single POST endpoints are functional and input-schema compliant.
Future model refresh cycles can use GridSearchCV within a CI pipeline and SHAP logging for continuous drift monitoring.
SHAP analysis will confirm:
Top Drivers: [Product_Type, Product_MRP, Store_Type, Store_Size, Product_Sugar_Content]
These features consistently rank high in model importance and business relevance.